返回 引气・Java 气海初拓

14重走我的Java Day14 :网络编程:从“聊天室玩具”到“百万并发网关”的惊魂一夜

博主
大约 27 分钟

重走我的Java Day14 :网络编程:从“聊天室玩具”到“百万并发网关”的惊魂一夜

凌晨3点,我的手机被打爆了:“你的聊天室炸了,同时在线人数超过10万就完全瘫痪!”我盯着那个用ServerSocket.accept()写出的玩具代码,冷汗直流。那一夜,我从零开始重写,直到天亮才明白:网络编程不是Socket调用的艺术,而是并发与可靠性的生死较量。

开篇:那个被10万用户冲垮的“高性能”聊天室

事故现场代码

image-20260201224915393

java

// 我引以为傲的“高性能”聊天服务器 - 能处理1000并发!
public class ChatServer {
    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(8888);
        System.out.println("服务器启动,端口8888");
        
        while (true) {
            // 问题1:同步阻塞,一个连接处理完才能接受下一个
            Socket clientSocket = serverSocket.accept();
            
            // 问题2:为每个连接创建新线程,不限制数量
            new Thread(() -> {
                try {
                    handleClient(clientSocket);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
    
    private static void handleClient(Socket socket) throws IOException {
        BufferedReader in = new BufferedReader(
            new InputStreamReader(socket.getInputStream()));
        PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
        
        String message;
        while ((message = in.readLine()) != null) {
            // 问题3:广播消息时遍历所有连接,没有优化
            broadcast(message);
        }
    }
    
    private static void broadcast(String message) {
        // 同步遍历所有客户端,一个慢就全慢
        for (PrintWriter writer : allClients) {
            writer.println(message);
        }
    }
}

事故分析

  1. C10K问题:10,000个连接需要10,000个线程,每个线程默认栈大小1MB → 10GB内存!
  2. 线程创建销毁开销:频繁连接断开导致大量线程创建销毁
  3. 广播风暴:每次消息都要遍历所有连接,O(n)复杂度
  4. 没有背压控制:客户端发送太快会压垮服务器
  5. 没有心跳机制:死连接无法检测,浪费资源

一、TCP:从“可靠传输”到“生产级服务”

1.1 TCP三次握手的真实代价

image-20260201224935221

我曾以为TCP连接就是new Socket()那么简单,直到我在生产环境遇到SYN Flood攻击:

java

// 有漏洞的服务端代码
public class VulnerableServer {
    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(8888);
        
        // 问题:没有限制未完成连接队列的大小
        while (true) {
            Socket socket = serverSocket.accept();
            // 立即创建线程处理,资源消耗大
            new Thread(new Handler(socket)).start();
        }
    }
}

// 防御性TCP服务器
public class DefensiveTCPServer {
    private static final int BACKLOG_SIZE = 50; // 半连接队列+全连接队列
    private static final int SO_TIMEOUT = 5000; // 5秒超时
    private static final int BUFFER_SIZE = 8192; // 8KB缓冲区
    
    public void start() throws IOException {
        // 1. 设置合理的backlog
        ServerSocket serverSocket = new ServerSocket(8888, BACKLOG_SIZE);
        
        // 2. 设置TCP参数
        serverSocket.setReuseAddress(true); // 允许重用TIME_WAIT端口
        serverSocket.setSoTimeout(SO_TIMEOUT); // accept超时
        
        System.out.println("服务器启动,监听端口: " + serverSocket.getLocalPort());
        System.out.println("接收缓冲区大小: " + serverSocket.getReceiveBufferSize());
        System.out.println("发送缓冲区大小: " + serverSocket.getSendBufferSize());
        
        // 3. 使用线程池限制并发
        ExecutorService executor = new ThreadPoolExecutor(
            10, 100, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(1000),
            new NamedThreadFactory("tcp-worker"),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
        
        // 4. 监控连接数
        AtomicInteger connectionCount = new AtomicInteger(0);
        ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
        monitor.scheduleAtFixedRate(() -> {
            System.out.println("当前连接数: " + connectionCount.get());
        }, 1, 1, TimeUnit.SECONDS);
        
        while (!Thread.currentThread().isInterrupted()) {
            try {
                Socket socket = serverSocket.accept();
                
                // 5. 限制最大连接数
                if (connectionCount.get() >= 1000) {
                    System.out.println("连接数达到上限,拒绝新连接");
                    socket.close();
                    continue;
                }
                
                connectionCount.incrementAndGet();
                
                // 6. 设置socket参数
                configureSocket(socket);
                
                // 7. 提交任务到线程池
                executor.submit(() -> {
                    try {
                        handleConnection(socket);
                    } finally {
                        connectionCount.decrementAndGet();
                    }
                });
                
            } catch (SocketTimeoutException e) {
                // accept超时,继续循环,允许检查中断
                System.out.println("accept超时,继续等待...");
            } catch (IOException e) {
                System.err.println("接受连接失败: " + e.getMessage());
            }
        }
    }
    
    private void configureSocket(Socket socket) throws SocketException {
        // 设置读取超时
        socket.setSoTimeout(30000); // 30秒
        
        // 开启TCP_NODELAY,禁用Nagle算法(小数据包立即发送)
        socket.setTcpNoDelay(true);
        
        // 开启SO_KEEPALIVE,检测死连接
        socket.setKeepAlive(true);
        
        // 设置缓冲区大小
        socket.setReceiveBufferSize(BUFFER_SIZE);
        socket.setSendBufferSize(BUFFER_SIZE);
        
        // 设置SO_LINGER,关闭时等待数据发送
        socket.setSoLinger(true, 5); // 等待5秒
        
        // 开启OOBINLINE(带外数据)
        socket.setOOBInline(true);
    }
    
    private void handleConnection(Socket socket) {
        try (socket;
             InputStream in = socket.getInputStream();
             OutputStream out = socket.getOutputStream()) {
            
            // 使用缓冲流提高性能
            BufferedInputStream bis = new BufferedInputStream(in, BUFFER_SIZE);
            BufferedOutputStream bos = new BufferedOutputStream(out, BUFFER_SIZE);
            
            byte[] buffer = new byte[BUFFER_SIZE];
            int bytesRead;
            
            while ((bytesRead = bis.read(buffer)) != -1) {
                // 处理数据
                processData(buffer, bytesRead, bos);
                
                // 检查连接是否活跃
                if (socket.isClosed() || !socket.isConnected()) {
                    break;
                }
            }
            
        } catch (SocketTimeoutException e) {
            System.out.println("读取超时,关闭连接");
        } catch (IOException e) {
            System.err.println("连接异常: " + e.getMessage());
        }
    }
}

1.2 TCP粘包/拆包:从坑里爬出来的经验

我第一次遇到TCP粘包时,以为数据丢失了:

java

// 错误示例:简单读取,会粘包
public class NaiveTCPClient {
    public void sendMessages(Socket socket, List<String> messages) throws IOException {
        OutputStream out = socket.getOutputStream();
        
        for (String msg : messages) {
            out.write(msg.getBytes());
            // 问题:没有分隔符,多条消息会粘在一起
        }
    }
}

// 解决方案1:定长消息
public class FixedLengthProtocol {
    private static final int MESSAGE_LENGTH = 100;
    
    public void sendMessage(Socket socket, String message) throws IOException {
        DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
        
        // 确保消息长度固定
        byte[] data = message.getBytes(StandardCharsets.UTF_8);
        if (data.length > MESSAGE_LENGTH) {
            throw new IllegalArgumentException("消息太长");
        }
        
        // 填充到固定长度
        byte[] padded = new byte[MESSAGE_LENGTH];
        System.arraycopy(data, 0, padded, 0, data.length);
        
        dos.write(padded);
        dos.flush();
    }
    
    public String receiveMessage(Socket socket) throws IOException {
        DataInputStream dis = new DataInputStream(socket.getInputStream());
        byte[] buffer = new byte[MESSAGE_LENGTH];
        
        // 读取固定长度
        dis.readFully(buffer);
        return new String(buffer, StandardCharsets.UTF_8).trim();
    }
}

// 解决方案2:分隔符协议
public class DelimiterProtocol {
    private static final byte DELIMITER = '\n'; // 换行符作为分隔符
    
    public void sendMessage(Socket socket, String message) throws IOException {
        OutputStream out = socket.getOutputStream();
        out.write(message.getBytes(StandardCharsets.UTF_8));
        out.write(DELIMITER); // 写入分隔符
        out.flush();
    }
    
    public String receiveMessage(Socket socket) throws IOException {
        InputStream in = socket.getInputStream();
        ByteArrayOutputStream buffer = new ByteArrayOutputStream();
        
        int b;
        while ((b = in.read()) != -1) {
            if (b == DELIMITER) {
                break; // 遇到分隔符,消息结束
            }
            buffer.write(b);
        }
        
        return buffer.toString(StandardCharsets.UTF_8.name());
    }
}

// 解决方案3:长度前缀协议(最推荐)
public class LengthPrefixProtocol {
    public void sendMessage(Socket socket, String message) throws IOException {
        DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
        byte[] data = message.getBytes(StandardCharsets.UTF_8);
        
        // 先发送长度(4字节整数)
        dos.writeInt(data.length);
        
        // 再发送数据
        dos.write(data);
        dos.flush();
    }
    
    public String receiveMessage(Socket socket) throws IOException {
        DataInputStream dis = new DataInputStream(socket.getInputStream());
        
        // 先读取长度
        int length = dis.readInt();
        if (length <= 0 || length > 1024 * 1024) { // 限制1MB
            throw new IOException("无效消息长度: " + length);
        }
        
        // 根据长度读取数据
        byte[] buffer = new byte[length];
        dis.readFully(buffer);
        
        return new String(buffer, StandardCharsets.UTF_8);
    }
}

// 解决方案4:使用现成的协议(如Protobuf、MsgPack)
public class ProtobufProtocol {
    public void sendProtobufMessage(Socket socket, Message message) throws IOException {
        // 使用Protobuf序列化
        byte[] data = message.toByteArray();
        
        DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
        dos.writeInt(data.length);
        dos.write(data);
        dos.flush();
    }
    
    public Message receiveProtobufMessage(Socket socket) throws IOException {
        DataInputStream dis = new DataInputStream(socket.getInputStream());
        int length = dis.readInt();
        
        byte[] buffer = new byte[length];
        dis.readFully(buffer);
        
        // 使用Protobuf反序列化
        return Message.parseFrom(buffer);
    }
}

二、UDP:从"不可靠"到"准可靠"的进化

image-20260201224953109

2.1 UDP的真实性能陷阱

我以为UDP就是快,直到看到这个性能测试:

java

public class UDPPerformanceTest {
    // 错误:频繁创建DatagramPacket
    public void sendMessagesBad(DatagramSocket socket, InetAddress address, int port, 
                                List<String> messages) throws IOException {
        for (String msg : messages) {
            byte[] data = msg.getBytes(); // 每次创建新数组
            DatagramPacket packet = new DatagramPacket(data, data.length, address, port);
            socket.send(packet); // 每次系统调用
        }
    }
    
    // 优化:重用DatagramPacket和缓冲区
    public void sendMessagesGood(DatagramSocket socket, InetAddress address, int port,
                                 List<String> messages) throws IOException {
        // 重用缓冲区(根据MTU设置,通常是1500-减去IP和UDP头)
        byte[] buffer = new byte[1472]; // 1500 - 20(IP) - 8(UDP)
        DatagramPacket packet = new DatagramPacket(buffer, buffer.length, address, port);
        
        for (String msg : messages) {
            byte[] data = msg.getBytes(StandardCharsets.UTF_8);
            
            // 检查长度
            if (data.length > buffer.length) {
                throw new IOException("消息太长: " + data.length);
            }
            
            // 重用packet,只更新数据
            System.arraycopy(data, 0, buffer, 0, data.length);
            packet.setLength(data.length);
            socket.send(packet);
        }
    }
    
    // 批量发送进一步优化
    public void sendBatched(DatagramSocket socket, InetAddress address, int port,
                            List<String> messages, int batchSize) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(baos);
        
        for (int i = 0; i < messages.size(); i++) {
            byte[] data = messages.get(i).getBytes(StandardCharsets.UTF_8);
            dos.writeInt(data.length);
            dos.write(data);
            
            // 达到批量大小或最后一条消息时发送
            if ((i + 1) % batchSize == 0 || i == messages.size() - 1) {
                byte[] batchData = baos.toByteArray();
                DatagramPacket packet = new DatagramPacket(batchData, batchData.length, 
                                                          address, port);
                socket.send(packet);
                
                // 重置缓冲区
                baos.reset();
            }
        }
    }
}

2.2 在UDP上实现可靠传输

我在游戏项目中需要UDP的实时性,又需要TCP的可靠性:

java

// 可靠UDP协议实现(简化版)
public class ReliableUDP {
    private static final int MAX_PACKET_SIZE = 1400; // 考虑MTU
    private static final int WINDOW_SIZE = 32; // 滑动窗口大小
    private static final int TIMEOUT_MS = 100; // 超时时间
    
    // 数据包头
    private static class PacketHeader {
        int sequence;      // 序列号
        int ack;           // 确认号
        int ackBitfield;   // 累计确认位图
        long timestamp;    // 时间戳
    }
    
    // 发送方状态
    private class SenderState {
        private final Map<Integer, SentPacket> sentPackets = new ConcurrentHashMap<>();
        private final AtomicInteger nextSequence = new AtomicInteger(0);
        private final AtomicInteger windowStart = new AtomicInteger(0);
        
        void sendPacket(DatagramSocket socket, InetAddress address, int port, 
                       byte[] data) throws IOException {
            int seq = nextSequence.getAndIncrement();
            
            // 构建数据包
            ByteBuffer buffer = ByteBuffer.allocate(MAX_PACKET_SIZE);
            buffer.putInt(seq); // 序列号
            buffer.putInt(0);   // ack(接收方填充)
            buffer.putInt(0);   // ackBitfield
            buffer.putLong(System.currentTimeMillis()); // 时间戳
            buffer.put(data);
            
            byte[] packetData = new byte[buffer.position()];
            buffer.rewind();
            buffer.get(packetData);
            
            // 发送
            DatagramPacket packet = new DatagramPacket(packetData, packetData.length, 
                                                      address, port);
            socket.send(packet);
            
            // 记录已发送的数据包
            sentPackets.put(seq, new SentPacket(packetData, System.currentTimeMillis()));
            
            // 检查窗口是否满
            while (seq - windowStart.get() >= WINDOW_SIZE) {
                // 窗口已满,等待确认
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
        
        void processAck(int ackSeq, int ackBitfield) {
            // 更新窗口起始位置
            int currentStart = windowStart.get();
            if (ackSeq > currentStart) {
                windowStart.set(ackSeq);
            }
            
            // 根据位图确认更多数据包
            for (int i = 0; i < 32; i++) {
                if ((ackBitfield & (1 << i)) != 0) {
                    int seq = ackSeq - i - 1;
                    if (seq >= currentStart) {
                        sentPackets.remove(seq);
                    }
                }
            }
            
            // 移除确认的数据包
            sentPackets.keySet().removeIf(seq -> seq < windowStart.get());
        }
        
        void checkTimeouts() {
            long now = System.currentTimeMillis();
            sentPackets.forEach((seq, packet) -> {
                if (now - packet.sendTime > TIMEOUT_MS) {
                    // 超时重传
                    try {
                        DatagramPacket dp = new DatagramPacket(packet.data, packet.data.length,
                                                              socket.getInetAddress(), 
                                                              socket.getPort());
                        socket.send(dp);
                        packet.sendTime = now; // 更新发送时间
                    } catch (IOException e) {
                        System.err.println("重传失败: " + e.getMessage());
                    }
                }
            });
        }
    }
    
    // 接收方状态
    private class ReceiverState {
        private final TreeSet<Integer> receivedSequences = new TreeSet<>();
        private int expectedSequence = 0;
        
        void processPacket(byte[] data, DatagramSocket socket, 
                          InetAddress senderAddress, int senderPort) throws IOException {
            ByteBuffer buffer = ByteBuffer.wrap(data);
            int seq = buffer.getInt();
            
            // 检查是否是重复包
            if (receivedSequences.contains(seq)) {
                // 重复包,只发送ACK
                sendAck(socket, senderAddress, senderPort, seq);
                return;
            }
            
            // 按序到达
            if (seq == expectedSequence) {
                receivedSequences.add(seq);
                expectedSequence++;
                
                // 处理可能的乱序包
                while (receivedSequences.contains(expectedSequence)) {
                    receivedSequences.remove(expectedSequence);
                    expectedSequence++;
                }
                
                // 处理数据
                buffer.getInt(); // 跳过ack字段
                buffer.getInt(); // 跳过ackBitfield
                buffer.getLong(); // 跳过时间戳
                
                int dataLength = data.length - 20; // 减去头部20字节
                byte[] payload = new byte[dataLength];
                buffer.get(payload);
                
                deliverData(payload);
                
            } else if (seq > expectedSequence) {
                // 乱序到达,先存储
                receivedSequences.add(seq);
            }
            // seq < expectedSequence 是重复包,上面已处理
            
            // 发送ACK
            sendAck(socket, senderAddress, senderPort, expectedSequence - 1);
        }
        
        private void sendAck(DatagramSocket socket, InetAddress address, int port, 
                            int ackSeq) throws IOException {
            ByteBuffer buffer = ByteBuffer.allocate(12); // 只发送ACK头
            buffer.putInt(-1); // 序列号-1表示这是ACK包
            buffer.putInt(ackSeq); // 确认号
            
            // 构建累计确认位图
            int ackBitfield = 0;
            for (int seq : receivedSequences) {
                if (seq < ackSeq && seq >= ackSeq - 32) {
                    int offset = ackSeq - seq - 1;
                    ackBitfield |= (1 << offset);
                }
            }
            buffer.putInt(ackBitfield);
            
            byte[] ackData = buffer.array();
            DatagramPacket ackPacket = new DatagramPacket(ackData, ackData.length, 
                                                         address, port);
            socket.send(ackPacket);
        }
    }
}

三、NIO:从"阻塞地狱"到"高并发天堂"

3.1 Selector的魔法

当我第一次理解Selector时,感觉像发现了新大陆:

image-20260201225012172

java

public class NIOServer {
    private final Selector selector;
    private final ByteBuffer buffer = ByteBuffer.allocate(8192);
    private final Map<SocketChannel, ClientSession> sessions = new ConcurrentHashMap<>();
    
    public NIOServer(int port) throws IOException {
        // 1. 创建ServerSocketChannel并配置为非阻塞
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        serverChannel.bind(new InetSocketAddress(port));
        
        // 2. 创建Selector
        selector = Selector.open();
        
        // 3. 注册ServerSocketChannel到Selector,监听ACCEPT事件
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        
        System.out.println("NIO服务器启动,端口: " + port);
    }
    
    public void start() throws IOException {
        while (!Thread.currentThread().isInterrupted()) {
            // 4. 阻塞等待事件,超时1秒
            int readyChannels = selector.select(1000);
            
            if (readyChannels == 0) {
                continue; // 超时,继续循环
            }
            
            // 5. 处理就绪的事件
            Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
            
            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                keyIterator.remove(); // 必须移除!
                
                try {
                    if (key.isValid()) {
                        if (key.isAcceptable()) {
                            handleAccept(key);
                        } else if (key.isReadable()) {
                            handleRead(key);
                        } else if (key.isWritable()) {
                            handleWrite(key);
                        } else if (key.isConnectable()) {
                            handleConnect(key);
                        }
                    }
                } catch (IOException e) {
                    // 连接异常,关闭通道
                    key.cancel();
                    if (key.channel() != null) {
                        key.channel().close();
                    }
                }
            }
        }
    }
    
    private void handleAccept(SelectionKey key) throws IOException {
        ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
        SocketChannel clientChannel = serverChannel.accept();
        
        if (clientChannel != null) {
            // 配置为非阻塞
            clientChannel.configureBlocking(false);
            
            // 注册读事件
            clientChannel.register(selector, SelectionKey.OP_READ);
            
            // 创建会话
            ClientSession session = new ClientSession(clientChannel);
            sessions.put(clientChannel, session);
            
            System.out.println("新连接: " + clientChannel.getRemoteAddress());
        }
    }
    
    private void handleRead(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        ClientSession session = sessions.get(channel);
        
        buffer.clear();
        int bytesRead;
        
        try {
            bytesRead = channel.read(buffer);
        } catch (IOException e) {
            // 连接断开
            closeConnection(channel);
            return;
        }
        
        if (bytesRead == -1) {
            // 客户端主动关闭
            closeConnection(channel);
            return;
        }
        
        if (bytesRead > 0) {
            buffer.flip();
            byte[] data = new byte[buffer.remaining()];
            buffer.get(data);
            
            // 处理数据
            session.processData(data);
            
            // 如果需要回写,注册写事件
            if (session.hasDataToWrite()) {
                key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
            }
        }
    }
    
    private void handleWrite(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        ClientSession session = sessions.get(channel);
        
        ByteBuffer writeBuffer = session.getWriteBuffer();
        
        while (writeBuffer.hasRemaining()) {
            int bytesWritten = channel.write(writeBuffer);
            if (bytesWritten == 0) {
                // 写缓冲区满,下次再写
                break;
            }
        }
        
        if (!writeBuffer.hasRemaining()) {
            // 所有数据已写完,取消写事件监听
            key.interestOps(SelectionKey.OP_READ);
            session.writeComplete();
        }
    }
    
    private void closeConnection(SocketChannel channel) throws IOException {
        ClientSession session = sessions.remove(channel);
        if (session != null) {
            session.close();
        }
        
        channel.close();
        System.out.println("连接关闭: " + channel.getRemoteAddress());
    }
    
    // 客户端会话管理
    private static class ClientSession {
        private final SocketChannel channel;
        private final ByteBuffer writeBuffer = ByteBuffer.allocate(8192);
        private final Queue<byte[]> writeQueue = new ConcurrentLinkedQueue<>();
        
        ClientSession(SocketChannel channel) {
            this.channel = channel;
        }
        
        void processData(byte[] data) {
            // 处理接收到的数据
            System.out.println("收到数据: " + new String(data, StandardCharsets.UTF_8));
            
            // 简单回声
            writeQueue.add(("回声: " + new String(data, StandardCharsets.UTF_8)).getBytes());
        }
        
        boolean hasDataToWrite() {
            return !writeQueue.isEmpty();
        }
        
        ByteBuffer getWriteBuffer() {
            writeBuffer.clear();
            
            byte[] data;
            while ((data = writeQueue.poll()) != null) {
                if (writeBuffer.remaining() >= data.length) {
                    writeBuffer.put(data);
                } else {
                    // 放不下了,放回队列
                    writeQueue.add(data);
                    break;
                }
            }
            
            writeBuffer.flip();
            return writeBuffer;
        }
        
        void writeComplete() {
            // 写入完成回调
        }
        
        void close() {
            // 清理资源
        }
    }
}

3.2 Netty:生产级的网络框架

当我最终放弃手写NIO,转向Netty时,生产力提升了10倍:

java

// 使用Netty实现高性能Echo服务器
public class NettyEchoServer {
    private final int port;
    
    public NettyEchoServer(int port) {
        this.port = port;
    }
    
    public void start() throws Exception {
        // 1. 创建EventLoopGroup
        EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 接收连接
        EventLoopGroup workerGroup = new NioEventLoopGroup(); // 处理I/O
        
        try {
            // 2. 创建ServerBootstrap
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // 使用NIO传输
             .localAddress(new InetSocketAddress(port))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel ch) {
                     ChannelPipeline pipeline = ch.pipeline();
                     
                     // 添加解码器(处理粘包/拆包)
                     pipeline.addLast(new LengthFieldBasedFrameDecoder(
                         Integer.MAX_VALUE, 0, 4, 0, 4));
                     
                     // 添加编码器
                     pipeline.addLast(new LengthFieldPrepender(4));
                     
                     // 添加字符串编解码器
                     pipeline.addLast(new StringDecoder(StandardCharsets.UTF_8));
                     pipeline.addLast(new StringEncoder(StandardCharsets.UTF_8));
                     
                     // 添加业务处理器
                     pipeline.addLast(new EchoServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128) // 连接队列大小
             .childOption(ChannelOption.SO_KEEPALIVE, true) // 保持连接
             .childOption(ChannelOption.TCP_NODELAY, true) // 禁用Nagle
             .childOption(ChannelOption.SO_RCVBUF, 32 * 1024) // 接收缓冲区
             .childOption(ChannelOption.SO_SNDBUF, 32 * 1024); // 发送缓冲区
            
            // 3. 绑定端口,启动服务器
            ChannelFuture f = b.bind().sync();
            System.out.println("Netty服务器启动,端口: " + port);
            
            // 4. 等待服务器关闭
            f.channel().closeFuture().sync();
        } finally {
            // 5. 优雅关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    
    // 业务处理器
    @Sharable
    private static class EchoServerHandler extends ChannelInboundHandlerAdapter {
        // 消息计数器
        private final AtomicLong messageCount = new AtomicLong();
        private final AtomicLong totalBytes = new AtomicLong();
        
        // 定时打印统计信息
        public EchoServerHandler() {
            ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
            scheduler.scheduleAtFixedRate(() -> {
                long count = messageCount.get();
                long bytes = totalBytes.get();
                System.out.printf("统计: 消息数=%d, 总字节数=%d, 平均大小=%.2f\n",
                    count, bytes, count > 0 ? (double)bytes / count : 0);
            }, 1, 1, TimeUnit.SECONDS);
        }
        
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            String message = (String) msg;
            
            // 更新统计
            messageCount.incrementAndGet();
            totalBytes.addAndGet(message.length());
            
            // 回声
            ctx.write(message);
            
            // 流量控制:如果积压太多,暂停读取
            if (ctx.channel().unsafe().outboundBuffer().size() > 65536) {
                ctx.channel().config().setAutoRead(false);
                
                // 当缓冲区小于一半时恢复读取
                ctx.channel().eventLoop().schedule(() -> {
                    if (ctx.channel().isActive()) {
                        ctx.channel().config().setAutoRead(true);
                    }
                }, 100, TimeUnit.MILLISECONDS);
            }
        }
        
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
            // 刷新到socket
            ctx.flush();
        }
        
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            // 记录异常并关闭连接
            System.err.println("连接异常: " + cause.getMessage());
            ctx.close();
        }
        
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            System.out.println("新连接: " + ctx.channel().remoteAddress());
            
            // 设置连接属性
            ctx.channel().attr(AttributeKey.valueOf("connectTime")).set(System.currentTimeMillis());
        }
        
        @Override
        public void channelInactive(ChannelHandlerContext ctx) {
            System.out.println("连接断开: " + ctx.channel().remoteAddress());
            
            Long connectTime = ctx.channel().attr(AttributeKey.<Long>valueOf("connectTime")).get();
            if (connectTime != null) {
                long duration = System.currentTimeMillis() - connectTime;
                System.out.println("连接持续时间: " + duration + "ms");
            }
        }
    }
}

四、实战:百万并发网关设计

基于那次10万并发的事故,我设计了新的网关系统:

image-20260201225022617

java

// 生产级网关架构
public class GatewayServer {
    private final int port;
    private final EventLoopGroup bossGroup;
    private final EventLoopGroup workerGroup;
    private final EventLoopGroup businessGroup;
    private Channel serverChannel;
    
    // 监控指标
    private final AtomicInteger connectionCount = new AtomicInteger();
    private final AtomicLong requestCount = new AtomicLong();
    private final AtomicLong totalLatency = new AtomicLong();
    
    public GatewayServer(int port) {
        this.port = port;
        
        // 根据CPU核心数设置线程数
        int cores = Runtime.getRuntime().availableProcessors();
        
        // Boss线程组:接收连接
        bossGroup = new NioEventLoopGroup(1);
        
        // Worker线程组:处理I/O
        workerGroup = new NioEventLoopGroup(cores * 2);
        
        // 业务线程组:处理业务逻辑(避免阻塞I/O线程)
        businessGroup = new DefaultEventLoopGroup(cores * 4);
    }
    
    public void start() throws Exception {
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .localAddress(new InetSocketAddress(port))
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new GatewayChannelInitializer(businessGroup))
             .option(ChannelOption.SO_BACKLOG, 10240) // 10k backlog
             .childOption(ChannelOption.SO_KEEPALIVE, true)
             .childOption(ChannelOption.TCP_NODELAY, true)
             .childOption(ChannelOption.SO_RCVBUF, 64 * 1024)
             .childOption(ChannelOption.SO_SNDBUF, 64 * 1024)
             .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
                 new WriteBufferWaterMark(32 * 1024, 64 * 1024));
            
            // 绑定端口
            ChannelFuture f = b.bind().sync();
            serverChannel = f.channel();
            
            System.out.println("网关启动成功,端口: " + port);
            System.out.println("CPU核心数: " + Runtime.getRuntime().availableProcessors());
            System.out.println("最大文件描述符: " + getMaxFileDescriptors());
            
            // 启动监控
            startMonitoring();
            
            // 等待关闭
            serverChannel.closeFuture().sync();
        } finally {
            shutdown();
        }
    }
    
    private class GatewayChannelInitializer extends ChannelInitializer<SocketChannel> {
        private final EventLoopGroup businessGroup;
        
        public GatewayChannelInitializer(EventLoopGroup businessGroup) {
            this.businessGroup = businessGroup;
        }
        
        @Override
        protected void initChannel(SocketChannel ch) {
            ChannelPipeline pipeline = ch.pipeline();
            
            // 1. 空闲检测(30秒无读写关闭连接)
            pipeline.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
            
            // 2. 心跳处理器
            pipeline.addLast(new HeartbeatHandler());
            
            // 3. 流量整形(限制每个连接速率)
            pipeline.addLast(new ChannelTrafficShapingHandler(1024 * 1024, 1024 * 1024));
            
            // 4. 解码器(自定义协议)
            pipeline.addLast(new GatewayDecoder());
            
            // 5. 编码器
            pipeline.addLast(new GatewayEncoder());
            
            // 6. 业务处理器(在业务线程组执行,避免阻塞I/O)
            pipeline.addLast(businessGroup, new GatewayBusinessHandler());
            
            // 7. 异常处理器
            pipeline.addLast(new GatewayExceptionHandler());
        }
    }
    
    // 心跳处理器
    private static class HeartbeatHandler extends ChannelInboundHandlerAdapter {
        private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(
            Unpooled.copiedBuffer("HEARTBEAT", StandardCharsets.UTF_8));
        
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent e = (IdleStateEvent) evt;
                if (e.state() == IdleState.READER_IDLE) {
                    // 发送心跳
                    ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate())
                       .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                }
            }
        }
    }
    
    // 业务处理器
    private class GatewayBusinessHandler extends SimpleChannelInboundHandler<GatewayRequest> {
        private final Random random = new Random();
        
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, GatewayRequest request) {
            long startTime = System.nanoTime();
            
            try {
                // 更新请求计数
                requestCount.incrementAndGet();
                
                // 模拟业务处理(随机延迟10-100ms)
                int delay = 10 + random.nextInt(90);
                TimeUnit.MILLISECONDS.sleep(delay);
                
                // 构建响应
                GatewayResponse response = new GatewayResponse(
                    request.getId(),
                    "OK",
                    System.currentTimeMillis()
                );
                
                // 写回响应
                ctx.writeAndFlush(response);
                
                // 计算延迟
                long latency = System.nanoTime() - startTime;
                totalLatency.addAndGet(latency);
                
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                ctx.writeAndFlush(new GatewayResponse(
                    request.getId(),
                    "ERROR",
                    System.currentTimeMillis()
                ));
            }
        }
        
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            int count = connectionCount.incrementAndGet();
            System.out.println("新连接,当前连接数: " + count);
            
            // 限制最大连接数
            if (count > 100000) {
                System.out.println("连接数超过限制,关闭新连接");
                ctx.close();
            }
        }
        
        @Override
        public void channelInactive(ChannelHandlerContext ctx) {
            int count = connectionCount.decrementAndGet();
            System.out.println("连接断开,当前连接数: " + count);
        }
    }
    
    private void startMonitoring() {
        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
        
        // 每5秒打印一次统计信息
        scheduler.scheduleAtFixedRate(() -> {
            int connections = connectionCount.get();
            long requests = requestCount.get();
            long latency = totalLatency.get();
            
            double qps = requests / 5.0; // 每秒请求数
            double avgLatency = requests > 0 ? latency / (requests * 1_000_000.0) : 0; // 毫秒
            
            System.out.printf("监控 - 连接数: %d, QPS: %.2f, 平均延迟: %.2fms\n",
                connections, qps, avgLatency);
            
            // 重置计数器
            requestCount.set(0);
            totalLatency.set(0);
            
        }, 5, 5, TimeUnit.SECONDS);
    }
    
    private long getMaxFileDescriptors() {
        try {
            Class<?> mgmtFactory = Class.forName("java.lang.management.ManagementFactory");
            Method getPlatformMXBeans = mgmtFactory.getMethod("getPlatformMXBeans", Class.class);
            List<?> beans = (List<?>) getPlatformMXBeans.invoke(null, 
                Class.forName("com.sun.management.OperatingSystemMXBean"));
            
            if (!beans.isEmpty()) {
                Object osBean = beans.get(0);
                Method getMaxFileDescriptorCount = osBean.getClass()
                    .getMethod("getMaxFileDescriptorCount");
                return (long) getMaxFileDescriptorCount.invoke(osBean);
            }
        } catch (Exception e) {
            // 忽略
        }
        return -1;
    }
    
    public void shutdown() {
        if (serverChannel != null) {
            serverChannel.close();
        }
        
        if (bossGroup != null) {
            bossGroup.shutdownGracefully();
        }
        
        if (workerGroup != null) {
            workerGroup.shutdownGracefully();
        }
        
        if (businessGroup != null) {
            businessGroup.shutdownGracefully();
        }
        
        System.out.println("网关已关闭");
    }
}

经验总结:我的网络编程检查清单

性能检查清单

  • 是否使用连接池复用TCP连接?
  • 是否设置合理的TCP缓冲区大小?
  • 是否禁用Nagle算法(TCP_NODELAY)?
  • 是否启用TCP快速打开(TCP_FASTOPEN)?
  • 是否使用多路复用(NIO/Netty)而非阻塞I/O?

可靠性检查清单

  • 是否有心跳机制检测死连接?
  • 是否有重连机制?
  • 是否处理了TCP粘包/拆包?
  • 是否有超时和重试机制?
  • 是否有流量控制和背压机制?

安全性检查清单

  • 是否验证客户端身份?
  • 是否限制连接数和请求速率?
  • 是否使用TLS加密通信?
  • 是否防止DDoS攻击(如SYN Flood)?
  • 是否有完整的访问日志?

可维护性检查清单

  • 是否有完善的监控指标?
  • 是否有连接跟踪和诊断工具?
  • 是否支持热配置更新?
  • 是否有优雅的关闭和重启机制?
  • 代码是否易于测试和模拟?

最后的真相

那次10万并发的崩溃,虽然让我经历了最艰难的一夜,但也让我学到了:

网络编程的本质不是Socket API的调用,而是对并发、可靠性和资源管理的深刻理解。

我现在遵循的原则:

  1. 永远不要相信网络:网络是不可靠的,必须设计重试、超时和容错机制
  2. 资源是有限的:文件描述符、内存、线程都是有限的,必须合理管理
  3. 监控比功能更重要:看不到的系统是最危险的系统
  4. 简单比复杂更可靠:复杂的协议难以调试,简单的协议容易维护

记住:在网络编程中,正确处理失败比处理成功更重要。一个能优雅处理失败的简单系统,远比一个功能丰富但脆弱的复杂系统更有价值。

知识点测试

读完文章了?来测试一下你对知识点的掌握程度吧!

评论区

使用 GitHub 账号登录后即可发表评论,支持 Markdown 格式。

如果评论系统无法加载,请确保:

  • 您的网络可以访问 GitHub
  • giscus GitHub App 已安装到仓库
  • 仓库已启用 Discussions 功能